CosmosDbObservableExtensions Class
CosmosDbObservableExtensions Class
The CosmosDbObservableExtensions class provides observable extensions for CosmosDB operations.
In particular, it offers observable versions of common CosmosDB SDK methods that:
- provide logging of Queries and CRUD operations together with their payload sizes, the databases and collections they are exetuted with and, optionally, their full request and response payloads.
- provides metrics to OpenTelemetry about Latencies, inbound and oudbound data volumes and Request Units (RU) consumption for every operation. such metrics are tagged with database, container, operation type and caller information to allow detailed performance analysis.
CosmosDbObservableExtensions is part of the Diginsight.Components.Azure package.
Table of Contents
📋 Overview
The CosmosDbObservableExtensions class provides observable versions of standard CosmosDB SDK methods, automatically adding:
- Structured Logging: Detailed logs with connection info, queries, and results
- OpenTelemetry Activities: Distributed tracing with custom tags
- Error Handling: Comprehensive exception logging and re-throwing
- Performance Metrics: Automatic RU consumption tracking
- Context Enrichment: Container, database, and endpoint information
Key Features
- Zero Configuration: Works out-of-the-box with existing CosmosDB code
- Non-Intrusive: Drop-in replacements for standard SDK methods
- Rich Telemetry: Comprehensive observability without manual instrumentation
- Error Resilience: Graceful error handling without impact on application flow
- Query Cost Tracking: Automatic Request Unit (RU) consumption monitoring
- Structured Data: JSON serialization of entities and responses
Observable Operations
All major CosmosDB operations have observable counterparts:
| Category | Operations | Count |
|---|---|---|
| Query | FeedIterator, LINQ Queryable | 8 methods |
| CRUD | Create, Read, Upsert, Replace, Delete | 10 methods |
| Batch | ReadMany, DeleteAllByPartitionKey, TransactionalBatch | 5 methods |
| Patch | PatchItem operations | 2 methods |
| Utility | AsyncEnumeration, Response reading | 3 methods |
🔍 Additional Details
Synchronous vs Asynchronous Operations
The observable extensions maintain the same patterns as the original CosmosDB SDK while adding comprehensive telemetry. Every observable operation creates rich diagnostic information including:
Operation Context: - Database endpoint and authentication details - Container and partition information - Request timing and concurrency patterns
Performance Metrics: - Request Units (RU) consumption per operation - Latency measurements and distribution - Throughput and operation success rates
Query Analysis: - Full query text with parameter values - Query execution plans and optimization hints - Result set sizes and pagination information
Activity Tracking
Every observable operation creates an OpenTelemetry activity with rich context:
Standard Activity Tags: - query: SQL query text or operation type - container: CosmosDB container name - database: CosmosDB database name
- endpoint: CosmosDB endpoint URI - operation_type: Type of operation (query, read, create, etc.)
Query Cost Integration: - query_cost: Request Units consumed (when available) - Automatic integration with QueryCostMetricRecorder
Example Activity:
Activity: CosmosDbObservableExtensions.GetItemQueryIteratorObservable
- query: "SELECT * FROM c WHERE c.Status = @status"
- container: "users"
- database: "myapp"
- endpoint: "https://myaccount.documents.azure.com:443/"
- query_cost: 12.45
Logging
Comprehensive structured logging with consistent patterns across all operations:
Query Operations
🔍 CosmosDB query for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔍 Query: "SELECT * FROM c WHERE c.Status = 'Active'"
Item Operations
📦 CosmosDB create item for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
📦 entity:{"Id":"user123","Name":"John Doe","Status":"Active"}
🔍 CosmosDB read item for id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔍 partitionKey:["users"]
🔄 CosmosDB upsert for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔄 entity:{"Id":"user123","Name":"John Updated","Status":"Active"}
🗑️ CosmosDB delete item for class 'User' with id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🗑️ partitionKey:["users"]
Patch Operations
✂️ CosmosDB patch item for class 'User' with id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
✂️ partitionKey:["users"]
✂️ patchOperations:Replace /status "Inactive", Set /modifiedDate "2023-12-01T10:30:00Z"
Error Logging
❌ Error creating item in CosmosDB for type User: Request rate is large. ActivityId: 12345678-1234-1234-1234-123456789012
Metrics
When integrated with QueryCostMetricRecorder, observable operations automatically contribute to:
diginsight.query_costhistogram: Request Unit consumption tracking- Custom tags: Method names, callers, container, and database information
- Query normalization: Reduced cardinality through query pattern analysis
Error Handling
Comprehensive error handling with detailed logging:
try
{
var response = await container.CreateItemObservableAsync(duplicateUser);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
// Conflict automatically logged:
// ❌ Error creating item in CosmosDB for type User: Item with the specified id already exists
// Handle duplicate item scenario
await HandleDuplicateItemAsync(duplicateUser);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// Rate limiting automatically logged:
// ❌ Error creating item in CosmosDB for type User: Request rate is large
// Implement retry logic
await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
return await container.CreateItemObservableAsync(duplicateUser);
}Error Categories Handled: - Rate Limiting (429): Too Many Requests with retry guidance - Conflicts (409): Item already exists scenarios
- Not Found (404): Missing items or containers - Timeout: Request timeout scenarios - General Exceptions: Network, authentication, and service errors
⚙️ Configuration
Basic Usage
Replace standard CosmosDB SDK method calls with their observable counterparts by adding Observable to the method name:
// Standard SDK
var iterator = container.GetItemQueryIterator<User>("SELECT * FROM c WHERE c.Type = 'Active'");
// Observable version
var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c WHERE c.Type = 'Active'");
// Standard SDK
var response = await container.ReadItemAsync<User>("user123", new PartitionKey("users"));
// Observable version
var response = await container.ReadItemObservableAsync<User>("user123", new PartitionKey("users"));Initial Setup
Ensure Diginsight observability is configured in your application:
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add Diginsight observability
builder.Services.AddObservability();
// Configure OpenTelemetry (optional - for metrics)
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics.AddMeter("Diginsight.Components.Azure"));
var app = builder.Build();Dependency Injection
Register CosmosDB services with observable extensions:
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add Diginsight observability
builder.Services.AddObservability();
// Add CosmosDB with observable extensions
builder.Services.AddSingleton(sp =>
{
var connectionString = builder.Configuration.GetConnectionString("CosmosDB");
return new CosmosClient(connectionString);
});
builder.Services.AddScoped<IUserRepository, UserRepository>();
// UserRepository.cs
public class UserRepository : IUserRepository
{
private readonly Container _container;
public UserRepository(CosmosClient cosmosClient)
{
_container = cosmosClient.GetContainer("MyApp", "users");
}
public async Task<User> GetUserAsync(string id)
{
// Use observable extension - automatic telemetry
var response = await _container.ReadItemObservableAsync<User>(
id: id,
partitionKey: new PartitionKey("users")
);
return response.Resource;
}
public async IAsyncEnumerable<User> GetActiveUsersAsync()
{
var iterator = _container.GetItemQueryIteratorObservable<User>(
"SELECT * FROM c WHERE c.Status = 'Active'"
);
// Use async enumeration utility
await foreach (var user in iterator.GetAsyncItems())
{
yield return user;
}
}
}OpenTelemetry Setup
Configure OpenTelemetry to capture all observable telemetry:
// Program.cs
builder.Services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddAspNetCoreInstrumentation();
tracing.AddHttpClientInstrumentation();
// Add Diginsight activity sources
tracing.AddSource("Diginsight.Components.Azure");
// Configure exporters
tracing.AddConsoleExporter();
tracing.AddJaegerExporter();
})
.WithMetrics(metrics =>
{
metrics.AddAspNetCoreInstrumentation();
metrics.AddHttpClientInstrumentation();
// Add Diginsight meters for query cost tracking
metrics.AddMeter("Diginsight.Components.Azure");
// Configure exporters
metrics.AddConsoleExporter();
metrics.AddPrometheusExporter();
});
// Add query cost metric recording
builder.Services.AddCosmosDbQueryCostMetricRecorder(options =>
{
options.AddNormalizedQueryTag = true; // Include query patterns in metrics
options.AddQueryCallers = 2; // Include caller context
});Logging Configuration
Configure logging to capture observable operations:
// appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information",
// Enable CosmosDB observable logging
"Diginsight.Components.Azure": "Debug",
"Diginsight.Components.Azure.Extensions.CosmosDbObservableExtensions": "Information"
}
}
}
// Program.cs - Configure structured logging
builder.Services.Configure<LoggerFilterOptions>(options =>
{
// Show CosmosDB operations at Info level
options.AddFilter("Diginsight.Components.Azure.Extensions", LogLevel.Information);
// Show query cost metrics at Debug level
options.AddFilter("Diginsight.Components.Azure.Metrics", LogLevel.Debug);
});💡 Usage Examples
Query Operations
ToFeedIteratorObservable()
Creates an observable FeedIterator from an IQueryable with automatic telemetry.
// Method signature
public static FeedIterator<T> ToFeedIteratorObservable<T>(this IQueryable<T> query, Container? container = null)
public static FeedIterator<T> ToFeedIteratorObservable<T>(this Container container, IQueryable<T> query)
// Usage
var query = container.GetItemLinqQueryable<User>()
.Where(u => u.Status == "Active")
.OrderBy(u => u.CreatedDate);
var iterator = query.ToFeedIteratorObservable(container);
// or
var iterator = container.ToFeedIteratorObservable(query);Telemetry Added: - Activity with query text and container information - Log messages with query details - Automatic query cost tracking when used with ReadNextObservableAsync()
GetItemQueryIteratorObservable()
Observable version of GetItemQueryIterator<T>() with multiple overloads for different query scenarios.
// String query
var iterator = container.GetItemQueryIteratorObservable<User>(
query: "SELECT * FROM c WHERE c.Status = @status",
continuationToken: null,
requestOptions: new QueryRequestOptions
{
PartitionKey = new PartitionKey("users"),
MaxItemCount = 100
}
);
// QueryDefinition
var queryDef = new QueryDefinition("SELECT * FROM c WHERE c.Status = @status")
.WithParameter("@status", "Active");
var iterator = container.GetItemQueryIteratorObservable<User>(
queryDefinition: queryDef,
continuationToken: null,
requestOptions: requestOptions
);
// With FeedRange for parallel processing
var iterator = container.GetItemQueryIteratorObservable<User>(
feedRange: feedRange,
queryDefinition: queryDef,
continuationToken: token,
requestOptions: requestOptions
);Logged Information: - Database endpoint and container details - Full query text or QueryDefinition - Query parameters (when using QueryDefinition)
GetItemQueryStreamIteratorObservable()
Observable version for stream-based query operations, useful for large result sets or custom deserialization.
// String query
var streamIterator = container.GetItemQueryStreamIteratorObservable(
query: "SELECT * FROM c",
continuationToken: null,
requestOptions: requestOptions
);
// QueryDefinition
var streamIterator = container.GetItemQueryStreamIteratorObservable(
queryDefinition: queryDef,
continuationToken: null,
requestOptions: requestOptions
);
// Process stream results
while (streamIterator.HasMoreResults)
{
using var response = await streamIterator.ReadNextAsync();
// Custom stream processing
}GetItemLinqQueryableObservable()
Creates observable LINQ queryables with optional transformation functions.
// Basic queryable
var queryable = container.GetItemLinqQueryableObservable<User>(
allowSynchronousQueryExecution: false,
continuationToken: null,
requestOptions: requestOptions,
linqSerializerOptions: null
);
// With transformation
var filteredQueryable = container.GetItemLinqQueryableObservable<User>(
transform: q => q.Where(u => u.Status == "Active").OrderBy(u => u.Name),
allowSynchronousQueryExecution: false,
continuationToken: null,
requestOptions: requestOptions
);Use Cases: - Building dynamic LINQ queries - Complex filtering and sorting - Query composition patterns
Item Operations
CreateItemObservableAsync()
Observable version of CreateItemAsync<T>() with comprehensive logging and error handling.
var newUser = new User
{
Id = "user123",
Name = "John Doe",
Status = "Active"
};
var response = await container.CreateItemObservableAsync(
item: newUser,
partitionKey: new PartitionKey("users"),
requestOptions: new ItemRequestOptions { EnableContentResponseOnWrite = true },
cancellationToken: cancellationToken
);
Console.WriteLine($"Created item with RU cost: {response.RequestCharge}");Telemetry Features: - Logs full entity details using Stringify() extension - Tracks creation success/failure - Records RU consumption - Activity output includes full response details
ReadItemObservableAsync()
Observable version of ReadItemAsync<T>() with detailed logging.
try
{
var response = await container.ReadItemObservableAsync<User>(
id: "user123",
partitionKey: new PartitionKey("users"),
requestOptions: new ItemRequestOptions { ConsistencyLevel = ConsistencyLevel.Session },
cancellationToken: cancellationToken
);
var user = response.Resource;
Console.WriteLine($"Read user: {user.Name}, RU cost: {response.RequestCharge}");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// Item not found - automatically logged by observable extension
Console.WriteLine("User not found");
}UpsertItemObservableAsync()
Observable version of UpsertItemAsync<T>() for insert-or-update operations.
var user = new User
{
Id = "user123",
Name = "John Updated",
Status = "Active",
ModifiedDate = DateTime.UtcNow
};
var response = await container.UpsertItemObservableAsync(
item: user,
partitionKey: new PartitionKey("users"),
requestOptions: new ItemRequestOptions
{
IndexingDirective = IndexingDirective.Include,
EnableContentResponseOnWrite = true
},
cancellationToken: cancellationToken
);
bool wasCreated = response.StatusCode == HttpStatusCode.Created;
bool wasUpdated = response.StatusCode == HttpStatusCode.OK;ReplaceItemObservableAsync()
Observable version of ReplaceItemAsync<T>() for updating existing items.
var updatedUser = existingUser with { Status = "Inactive", ModifiedDate = DateTime.UtcNow };
var response = await container.ReplaceItemObservableAsync(
item: updatedUser,
id: updatedUser.Id,
partitionKey: new PartitionKey("users"),
requestOptions: new ItemRequestOptions
{
IfMatchEtag = existingUser.ETag // Optimistic concurrency
},
cancellationToken: cancellationToken
);DeleteItemObservableAsync()
Observable version of DeleteItemAsync<T>() with deletion logging.
var response = await container.DeleteItemObservableAsync<User>(
id: "user123",
partitionKey: new PartitionKey("users"),
requestOptions: new ItemRequestOptions
{
IfMatchEtag = user.ETag // Ensure no concurrent modifications
},
cancellationToken: cancellationToken
);
Console.WriteLine($"Deleted user, RU cost: {response.RequestCharge}");Stream Operations
For high-performance scenarios, stream-based operations avoid object serialization overhead:
// Create with stream
using var stream = new MemoryStream(JsonSerializer.SerializeToUtf8Bytes(user));
var response = await container.CreateItemStreamObservableAsync(
streamPayload: stream,
partitionKey: new PartitionKey("users"),
requestOptions: requestOptions,
cancellationToken: cancellationToken
);
// Read with stream
var streamResponse = await container.ReadItemStreamObservableAsync(
id: "user123",
partitionKey: new PartitionKey("users"),
requestOptions: requestOptions,
cancellationToken: cancellationToken
);
if (streamResponse.IsSuccessStatusCode)
{
var user = await JsonSerializer.DeserializeAsync<User>(streamResponse.Content);
}Batch Operations
CreateTransactionalBatchObservable()
Creates an observable transactional batch for atomic operations within a single partition.
// Create a transactional batch for a specific partition
var batch = container.CreateTransactionalBatchObservable(new PartitionKey("user-partition"));
// Add operations to the batch
batch.CreateItem(newUser);
batch.UpsertItem(existingUser);
batch.DeleteItem("item-to-delete");
// Execute the batch atomically
var response = await batch.ExecuteObservableAsync(cancellationToken);
if (response.IsSuccessStatusCode)
{
Console.WriteLine($"Batch executed successfully, RU cost: {response.RequestCharge}");
// Process individual operation results
for (int i = 0; i < response.Count; i++)
{
var operationResult = response[i];
Console.WriteLine($"Operation {i}: {operationResult.StatusCode}");
}
}ExecuteObservableAsync()
Executes a transactional batch with full observability.
var batch = container.CreateTransactionalBatch(new PartitionKey("partition-key"));
batch.CreateItem(item1);
batch.UpsertItem(item2);
// Execute with observable telemetry
var response = await batch.ExecuteObservableAsync(cancellationToken);
Console.WriteLine($"Batch execution completed with {response.RequestCharge} RU consumed");ReadManyItemsObservableAsync()
Efficiently read multiple items in a single request.
var itemsToRead = new List<(string id, PartitionKey partitionKey)>
{
("user1", new PartitionKey("users")),
("user2", new PartitionKey("users")),
("user3", new PartitionKey("users"))
};
var response = await container.ReadManyItemsObservableAsync<User>(
items: itemsToRead,
readManyRequestOptions: new ReadManyRequestOptions
{
ConsistencyLevel = ConsistencyLevel.Session
},
cancellationToken: cancellationToken
);
foreach (var user in response)
{
Console.WriteLine($"Read user: {user.Name}");
}
Console.WriteLine($"Read {response.Count} users with {response.RequestCharge} RU");DeleteAllItemsByPartitionKeyStreamObservableAsync()
Delete all items in a partition key efficiently.
var response = await container.DeleteAllItemsByPartitionKeyStreamObservableAsync(
partitionKey: new PartitionKey("inactive-users"),
requestOptions: new RequestOptions
{
IfMatchEtag = partitionETag // Optional optimistic concurrency
},
cancellationToken: cancellationToken
);
if (response.IsSuccessStatusCode)
{
Console.WriteLine($"Deleted all items in partition, RU cost: {response.Headers.RequestCharge}");
}Patch Operations
CosmosDB patch operations allow efficient partial updates:
PatchItemObservableAsync()
Observable version of PatchItemAsync<T>() for partial item updates.
var patchOperations = new[]
{
PatchOperation.Replace("/status", "Inactive"),
PatchOperation.Set("/modifiedDate", DateTime.UtcNow),
PatchOperation.Add("/tags/-", "archived") // Add to array
};
var response = await container.PatchItemObservableAsync<User>(
id: "user123",
partitionKey: new PartitionKey("users"),
patchOperations: patchOperations,
requestOptions: new PatchItemRequestOptions
{
IfMatchEtag = user.ETag,
EnableContentResponseOnWrite = true
},
cancellationToken: cancellationToken
);
var patchedUser = response.Resource;
Console.WriteLine($"Patched user, RU cost: {response.RequestCharge}");Patch Operation Types: - Replace: Update existing property value - Add: Add new property or append to array - Remove: Delete property or array element
- Set: Add property if missing, replace if exists - Increment: Numeric increment operation
Utility Methods
ReadNextObservableAsync()
Observable version of ReadNextAsync() for FeedIterator processing with enhanced telemetry.
var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
var response = await iterator.ReadNextObservableAsync(cancellationToken);
foreach (var user in response)
{
Console.WriteLine($"Processing user: {user.Name}");
}
Console.WriteLine($"Page RU cost: {response.RequestCharge}");
Console.WriteLine($"Continuation token: {response.ContinuationToken}");
}Enhanced Telemetry: - Detailed logging of page results and RU consumption - Automatic query cost metric recording (when configured) - Activity tags for performance monitoring
GetAsyncItems()
Convert FeedIterator to IAsyncEnumerable for modern async processing patterns.
var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c WHERE c.Status = 'Active'");
// Process items as they arrive
await foreach (var user in iterator.GetAsyncItems())
{
Console.WriteLine($"Processing user: {user.Name}");
// Process each user individually
await ProcessUserAsync(user);
}Benefits: - Memory efficient streaming processing - Compatible with LINQ async operators - Natural integration with async/await patterns
GetItemsAsync()
Materialize all results from a FeedIterator into a collection.
var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");
// Get all items at once
var allUsers = await iterator.GetItemsAsync();
Console.WriteLine($"Retrieved {allUsers.Count()} users total");
// Process collection
var activeUsers = allUsers.Where(u => u.Status == "Active").ToList();Use Cases: - Small result sets that fit in memory - Scenarios requiring full collection operations - Integration with synchronous processing logic
🚀 Advanced Usage
Custom Request Options
All observable methods support the full range of CosmosDB request options:
// Query with custom options
var queryOptions = new QueryRequestOptions
{
PartitionKey = new PartitionKey("users"),
MaxItemCount = 50,
MaxConcurrency = 10,
EnableScanInQuery = false,
ConsistencyLevel = ConsistencyLevel.Session,
IndexingDirective = IndexingDirective.Include,
ResponseContinuationTokenLimitInKb = 1
};
var iterator = container.GetItemQueryIteratorObservable<User>(
query: "SELECT * FROM c WHERE c.Status = @status",
requestOptions: queryOptions
);
// Item operations with custom options
var itemOptions = new ItemRequestOptions
{
ConsistencyLevel = ConsistencyLevel.Strong,
EnableContentResponseOnWrite = true,
IfMatchEtag = existingItem.ETag,
IfNoneMatchEtag = "*", // Fail if item exists
IndexingDirective = IndexingDirective.Exclude,
PreTriggers = new List<string> { "validateItem" },
PostTriggers = new List<string> { "updateMetadata" }
};
var response = await container.CreateItemObservableAsync(
item: newItem,
partitionKey: partitionKey,
requestOptions: itemOptions
);Partition Key Management
Proper partition key handling for optimal performance:
// Explicit partition key specification
var explicitPK = new PartitionKey("user-partition");
var response = await container.ReadItemObservableAsync<User>("user123", explicitPK);
// Hierarchical partition keys (CosmosDB v3.21+)
var hierarchicalPK = new PartitionKeyBuilder()
.Add("tenant-id")
.Add("user-group")
.Add("region")
.Build();
var item = await container.ReadItemObservableAsync<User>("user123", hierarchicalPK);
// None partition key for single partition containers
var singlePartitionItem = await container.ReadItemObservableAsync<Config>(
id: "app-config",
partitionKey: PartitionKey.None
);Async Enumeration Patterns
Modern async processing with observable operations:
// Streaming processing with backpressure handling
var query = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");
var processingOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = cancellationToken
};
await foreach (var user in query.GetAsyncItems().WithCancellation(cancellationToken))
{
// Process users with controlled parallelism
await Parallel.ForEachAsync(
new[] { user },
processingOptions,
async (u, ct) => await ProcessUserAsync(u, ct)
);
}
// Batch processing with size control
const int batchSize = 100;
var users = new List<User>(batchSize);
await foreach (var user in query.GetAsyncItems())
{
users.Add(user);
if (users.Count >= batchSize)
{
await ProcessUserBatchAsync(users);
users.Clear();
}
}
// Process remaining users
if (users.Count > 0)
{
await ProcessUserBatchAsync(users);
}🔧 Troubleshooting
Common Issues
1. Missing Telemetry Data
Symptoms: No logs or activities from observable operations
Solutions:
// Ensure Diginsight observability is registered
builder.Services.AddObservability();
// Verify activity sources are configured
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing.AddSource("Diginsight.Components.Azure"));
// Check logging configuration
"Diginsight.Components.Azure": "Information"2. High Memory Usage with Large Queries
Symptoms: Memory exhaustion when processing large result sets
Solutions:
// Use async enumeration instead of materializing all results
await foreach (var item in iterator.GetAsyncItems())
{
// Process items individually
}
// Instead of:
var allItems = await iterator.GetItemsAsync(); // Loads everything into memory3. Rate Limiting Not Handled
Symptoms: Frequent 429 (Too Many Requests) exceptions
Solutions:
// Implement proper retry logic
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
// Retry operation
}
// Or use CosmosDB SDK built-in retry
var clientOptions = new CosmosClientOptions
{
MaxRetryAttemptsOnRateLimitedRequests = 3,
MaxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds(10)
};4. Query Cost Metrics Not Appearing
Symptoms: Missing diginsight.query_cost metrics
Solutions:
// Ensure QueryCostMetricRecorder is registered
builder.Services.AddCosmosDbQueryCostMetricRecorder();
// Verify OpenTelemetry meter is configured
.WithMetrics(metrics => metrics.AddMeter("Diginsight.Components.Azure"));
// Use ReadNextObservableAsync() for automatic cost tracking
var response = await iterator.ReadNextObservableAsync();Debugging
Enable Detailed Logging:
// appsettings.Development.json
{
"Logging": {
"LogLevel": {
"Diginsight.Components.Azure": "Trace",
"Diginsight.Components.Azure.Extensions.CosmosDbObservableExtensions": "Debug"
}
}
}Activity Debugging:
// Add activity listeners for debugging
ActivitySource.AddActivityListener(new ActivityListener
{
ShouldListenTo = source => source.Name.StartsWith("Diginsight"),
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = activity => Console.WriteLine($"Started: {activity.OperationName}"),
ActivityStopped = activity => Console.WriteLine($"Stopped: {activity.OperationName} ({activity.Duration.TotalMilliseconds}ms)")
});📚 Reference
Extension Methods Summary
| Method | Return Type | Purpose |
|---|---|---|
ToFeedIteratorObservable<T>() |
FeedIterator<T> |
Convert IQueryable to observable FeedIterator |
GetItemQueryIteratorObservable<T>() |
FeedIterator<T> |
Observable typed query iterator |
GetItemQueryStreamIteratorObservable() |
FeedIterator |
Observable stream query iterator |
GetItemLinqQueryableObservable<T>() |
IOrderedQueryable<T> |
Observable LINQ queryable |
CreateItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item creation |
ReadItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item reading |
UpsertItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item upsert |
ReplaceItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item replacement |
DeleteItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item deletion |
PatchItemObservableAsync<T>() |
Task<ItemResponse<T>> |
Observable item patching |
ReadManyItemsObservableAsync<T>() |
Task<FeedResponse<T>> |
Observable batch reading |
DeleteAllItemsByPartitionKeyStreamObservableAsync() |
Task<ResponseMessage> |
Observable partition deletion |
ReadNextObservableAsync<T>() |
Task<FeedResponse<T>> |
Observable feed iteration |
GetAsyncItems<T>() |
IAsyncEnumerable<T> |
Async enumerable conversion |
GetItemsAsync<T>() |
Task<IEnumerable<T>> |
Full result materialization |
Log Messages
Standard log message patterns with emojis for easy identification:
| Operation | Emoji | Pattern |
|---|---|---|
| Query | 🔍 | CosmosDB query for class '{Type}' in database {Endpoint}, container {Container} |
| Create | 📦 | CosmosDB create item for class '{Type}' in database {Endpoint}, container {Container} |
| Read | 🔍 | CosmosDB read item for id '{Id}' in database {Endpoint}, container {Container} |
| Upsert | 🔄 | CosmosDB upsert for class '{Type}' in database {Endpoint}, container {Container} |
| Replace | 🔄 | CosmosDB replace item for class '{Type}' with id '{Id}' |
| Delete | 🗑️ | CosmosDB delete item for class '{Type}' with id '{Id}' |
| Patch | ✂️ | CosmosDB patch item for class '{Type}' with id '{Id}' |
| Error | ❌ | Error {operation} in CosmosDB for type {Type}: {ErrorMessage} |
Entity Logging: - Entity data is logged using the Stringify() extension method for structured representation - Partition keys are logged with their full structure - Patch operations include detailed operation lists
Query Information: - Full query text is logged for debugging - Query parameters are included when using QueryDefinition - Continuation tokens and request options are logged when relevant
💡 Best Practices
Performance Considerations
1. Use Appropriate Method Variants
// For large result sets - use streaming
var streamIterator = container.GetItemQueryStreamIteratorObservable(query);
// For typed results with moderate size - use generic methods
var typedIterator = container.GetItemQueryIteratorObservable<User>(query);
// For small, complete result sets - use materialization
var iterator = container.GetItemQueryIteratorObservable<User>(query);
var allUsers = await iterator.GetItemsAsync(); // Only for small sets2. Optimize Request Options
// Optimize for performance
var options = new QueryRequestOptions
{
MaxItemCount = 1000, // Larger pages = fewer round trips
MaxConcurrency = 10, // Parallel execution
EnableScanInQuery = false, // Avoid scans when possible
PartitionKey = partitionKey // Single partition queries
};
var iterator = container.GetItemQueryIteratorObservable<User>(query, requestOptions: options);3. Use Async Enumeration for Memory Efficiency
// Memory efficient - processes items as they arrive
await foreach (var user in iterator.GetAsyncItems())
{
await ProcessUserAsync(user);
}
// Memory intensive - loads all items first
var allUsers = await iterator.GetItemsAsync();
foreach (var user in allUsers)
{
await ProcessUserAsync(user);
}Error Handling Patterns
1. Specific Exception Handling
try
{
var response = await container.ReadItemObservableAsync<User>("user123", partitionKey);
return response.Resource;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// Handle missing item (logged automatically)
return null;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// Handle rate limiting (logged automatically)
await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
return await GetUserWithRetryAsync(id, partitionKey);
}2. Retry Patterns with Observability
public async Task<T> ExecuteWithRetryAsync<T>(Func<Task<T>> operation, int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
return await operation();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests && attempt < maxRetries)
{
// Rate limiting exception automatically logged by observable extension
var delay = ex.RetryAfter ?? TimeSpan.FromSeconds(Math.Pow(2, attempt));
await Task.Delay(delay);
}
}
// Final attempt without catch
return await operation();
}
// Usage
var user = await ExecuteWithRetryAsync(async () =>
{
var response = await container.ReadItemObservableAsync<User>("user123", partitionKey);
return response.Resource;
});Observability Guidelines
1. Use Meaningful Operation Names
The observable extensions automatically use the method name as the activity name. For custom operations, consider wrapping calls:
public async Task<User> GetUserProfileAsync(string userId)
{
using var activity = Observability.ActivitySource.StartMethodActivity(logger, () => new { userId });
activity?.SetTag("user.id", userId);
var response = await container.ReadItemObservableAsync<User>(
userId,
new PartitionKey("users")
);
return response.Resource;
}2. Configure Query Cost Tracking
// Enable query cost metrics for performance monitoring
services.AddCosmosDbQueryCostMetricRecorder(options =>
{
options.AddQueryCallers = 1; // Track immediate caller
options.IgnoreQueryCallers = new[] // Skip infrastructure methods
{
"*Repository.Get*",
"CosmosDbObservableExtensions.*"
};
});3. Monitor Key Metrics
Track these metrics for CosmosDB health: - Query Cost (diginsight.query_cost): Request Unit consumption - Query Duration: Response time patterns
- Error Rates: By operation type and error code - Throughput: Operations per second by container
This documentation covers CosmosDbObservableExtensions v1.0+. For the latest updates and examples, see the Diginsight Components repository.